Skip to content

[SYSTEMDS-3949] Add native Delta Lake frame read/write via Delta Kernel#2515

Open
Baunsgaard wants to merge 10 commits into
apache:mainfrom
Baunsgaard:delta-frame-io
Open

[SYSTEMDS-3949] Add native Delta Lake frame read/write via Delta Kernel#2515
Baunsgaard wants to merge 10 commits into
apache:mainfrom
Baunsgaard:delta-frame-io

Conversation

@Baunsgaard

@Baunsgaard Baunsgaard commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Extend the native Delta Lake support (#2511) from matrices to frames, reading and
writing Delta Lake tables through the Spark-free Delta Kernel library on the
single-node CP path. DML read/write with format="delta" now works for frames,
discovering schema, column names, and dimensions directly from the table.

Stacked on #2511 and should merge after it. Append/overwrite semantics, distributed
execution, and time travel remain out of scope.

Frame read/write

  • FrameReaderDelta / FrameWriterDelta: column-at-a-time materialization into typed
    frame Arrays (no per-cell boxing on the read path), wired into
    FrameReader/WriterFactory for format="delta".
  • FrameReaderDeltaParallel: decodes one task per parquet data file on the common
    thread pool (the parquet decode dominates a Delta read), falling back to the serial
    reader for single-file tables. Selected iff parallel CP read is enabled.
  • Two read strategies: a direct fast path that pre-sizes one array per column from
    the per-file numRecords statistics and decodes straight into each row offset, and a
    buffered fallback (per-batch decode + concatenate) for tables without exact row
    counts or with deletion vectors. Both guard against files that decode more or fewer
    rows than their statistic claims.
  • FrameObject now refreshes the cached metadata/schema after a Delta read (dimensions
    and schema are discovered at read time) and counts the HDFS write for statistics.

Adaptive writer file sizing

  • New sysds.delta.writer.adaptive.filesize (default true): the writer targets
    roughly one data file per expected parallel reader — clamped to the configured
    sysds.delta.writer.targetfilesize cap and a 4MB floor — so the per-file parallel
    read can use all threads. Applies to both the matrix (WriterDelta) and frame writers
    via shared helpers in DeltaKernelUtils; set the flag to false for a fixed target.

Shared column allocation

  • ArrayFactory gains allocateBacking(ValueType, int) and makes create(ValueType, Object) the single place that wraps a raw backing array into a frame Array,
    bit-packing large boolean columns into BitSetArray (consistent with other readers)
    and mapping UINT4/UINT8 to INT32. The Delta readers reuse these primitives
    instead of duplicating per-type allocation logic.

Tests

  • Direct round-trip tests (DeltaFrameReadWriteTest) covering mixed/large/multi-batch
    frames, string nulls, empty tables, short/byte coercion, non-mappable-type rejection,
    and serial-vs-parallel / direct-vs-buffered equivalence across multi-file tables.
  • Cross-engine interop (DeltaFrameSparkInteropTest): SystemDS↔Spark round-trips,
    multi-file reads, and Spark-written tables with deletion vectors.
  • DML-level read/write tests (FrameDeltaReadWriteTest).
  • Manual, @Ignored read benchmarks (DeltaFrameReadPerf, performance/DeltaFrameRead)
    for profiling the read path; not run in CI.

@codecov

codecov Bot commented Jun 25, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 89.02439% with 45 lines in your changes missing coverage. Please review.
✅ Project coverage is 71.69%. Comparing base (384a8dc) to head (682a208).
⚠️ Report is 6 commits behind head on main.

Files with missing lines Patch % Lines
.../org/apache/sysds/runtime/io/FrameReaderDelta.java 87.57% 7 Missing and 15 partials ⚠️
...che/sysds/runtime/io/FrameReaderDeltaParallel.java 89.52% 7 Missing and 4 partials ⚠️
.../org/apache/sysds/runtime/io/FrameWriterDelta.java 89.33% 4 Missing and 4 partials ⚠️
...ds/runtime/controlprogram/caching/FrameObject.java 71.42% 1 Missing and 1 partial ⚠️
.../org/apache/sysds/runtime/io/DeltaKernelUtils.java 88.88% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2515      +/-   ##
============================================
+ Coverage     71.56%   71.69%   +0.13%     
- Complexity    49110    49446     +336     
============================================
  Files          1575     1583       +8     
  Lines        189793   190908    +1115     
  Branches      37235    37436     +201     
============================================
+ Hits         135816   136864    +1048     
- Misses        43480    43498      +18     
- Partials      10497    10546      +49     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Extend the native Delta Lake support from matrices to frames, reading and
writing Delta Lake tables through the Spark-free Delta Kernel library on the
single-node CP path. DML read/write with format="delta" now works for
frames, discovering schema, column names, and dimensions directly from the
table.

- Add FrameReaderDelta, FrameReaderDeltaParallel and FrameWriterDelta
- Wire DELTA into the frame reader and writer factories
- Refresh cached frame metadata and schema after a Delta read
- Broaden Delta frame component IO coverage

Stacked on the matrix Delta support; append/overwrite semantics,
distributed execution, and time travel remain out of scope.
The native Delta read decode is CPU-bound and parallelizes per data
file, so a table written as one large file cannot use more than one
reader thread. Size data files toward roughly one file per expected
parallel reader, capped by the configured target and floored to avoid
tiny-file proliferation. This materially improves parallel-read
throughput for both matrix and frame tables.

- Add the sysds.io.delta.writer.adaptivefilesize config (default true)
  plus adaptiveWriterTargetFileSize/createWriteEngine helpers in
  DeltaKernelUtils, and document the target file size as an upper bound
- Wire FrameWriterDelta and WriterDelta to size files from the block's
  estimated bytes (dense double footprint for matrices)
- Use the configurable DELTA_WRITER_BATCH_SIZE in FrameWriterDelta
  instead of a hardcoded batch size, matching the matrix writer
The parallel frame reader's metadata-direct path wrote each data file's
rows into shared per-column arrays at a fixed offset without bounding the
row count, so a table whose per-file numRecords statistic under-counts the
actual rows (possible for externally written Delta tables) could overrun
its slice into the next file's region under concurrent writes.

- Add the per-file row-count overflow guard in FrameReaderDeltaParallel
  .readDirect, matching the matrix reader: fail fast with a clear message
  instead of risking overlapping concurrent writes or an array overrun
- Reuse DeltaKernelUtils.typeCode/T_* in FrameReaderDelta instead of a
  forked R_* table and instanceof cascade, keeping the frame and matrix
  type dispatch in lockstep; drop the now-unused type imports
- Extract awaitFileTasks in FrameReaderDeltaParallel to share the pool
  lifecycle across both read paths and restore the interrupt flag when a
  parallel read is cancelled
- Add a unit test covering the adaptive target-file-size flag on/off and
  the floor/cap clamp boundaries
- Clarify the adaptive-size javadoc floor wording, the createWriteEngine
  batch-size comment, and rename opaque locals (names2, bcs/bss)
The single-threaded frame reader extracted each batch into temporary
per-batch arrays and then concatenated them into the final column arrays,
allocating and copying every column twice. Delta's per-file numRecords
statistic already provides exact row counts from metadata, so the output
can be sized up front and decoded in a single pass with no intermediate
buffers.

- Rewrite FrameReaderDelta.readFrameFromHDFS to pre-size one typed array
  per column from the metadata row counts and decode each data file
  straight into its row offset (with the same per-file overflow guard as
  the parallel reader); fall back to the buffered extract-then-concatenate
  path only when exact counts are unavailable (missing statistics or
  deletion vectors present)
- Move allocColumn/createColumn/extractColumnInto up to FrameReaderDelta so
  the serial and parallel readers share one copy instead of duplicating the
  per-type column dispatch
- The parallel reader's single-file/low-thread fallback now also decodes in
  a single direct pass
Route the Delta frame readers' column allocation and wrapping through
ArrayFactory instead of reader-local per-type switches:

- Add ArrayFactory.allocateBacking(ValueType,int) as the single
  ValueType -> raw backing array mapping (the inverse of
  create(ValueType,Object)), and remove the duplicate allocColumn /
  createColumn / extractColumn / buildColumn switches from
  FrameReaderDelta and FrameReaderDeltaParallel. The buffered fallback
  now reuses the same alloc + extractColumnInto + concatColumn
  primitives as the direct path.
- Make create(ValueType,Object) bit-pack boolean columns above the
  switch point into a BitSetArray (mirroring allocateBoolean), so Delta
  reads produce the same compact representation as every other frame
  reader instead of a byte-backed BooleanArray.
- Simplify allocate(ValueType,int) to compose create + allocateBacking,
  keeping only the boolean special case (empty BitSet backing) and
  moving the UINT4/UINT8 fallback warning into allocateBacking.
- Move useDirectPath to FrameReaderDelta so both readers share it.
Add timing/throughput benchmarks for the serial and parallel Delta
frame readers:

- DeltaFrameRead in the performance suite (dispatched as Main id 18):
  writes a random frame to a temp Delta table once as untimed setup,
  then repeatedly reads it back under timing for serial/parallel/both
  modes with a configurable writer target file size. Suitable for
  running under async-profiler.
- DeltaFrameReadPerf: JUnit-based manual micro-benchmarks (all @ignore
  so they never run in the normal build) covering direct-vs-buffered
  serial reads, adaptive file sizing, target-size and batch-size sweeps,
  and schema-composition breakdowns.
Reformat the new Delta frame reader/writer and test files with the
project Eclipse style (dev/CodeStyle_eclipse.xml). Whitespace and layout
only, no behavioral changes.
- Replace the bespoke genMixedFrame generator in DeltaFrameReadWriteTest
  with TestUtils.generateRandomFrameBlock, removing duplicated random
  frame generation code.
- Add DeltaFrameSparkInteropTest exercising cross-engine round-trips
  against the reference Delta Spark connector: SystemDS-written multi-file
  frame read by Spark, Spark-written multi-file frame read by the serial
  and parallel SystemDS readers, and a Spark table with deletion vectors
  read via the buffered selection-mask path. Comparisons are keyed by a
  unique id column so no row order is assumed.
- Fail loud when a data file decodes fewer rows than its numRecords
  statistic (silent underflow) in both the serial and parallel direct
  read paths, alongside the existing overflow guard, and compute the
  selected-row count once per batch.
- Extract a shared ReadPlan (read codes, value types, names) and a
  readWithHandle entry point so the schema derivation lives in one place
  and the parallel reader reuses its already-opened scan handle for the
  single-file fallback instead of re-opening the snapshot.
- Move the null-result check above the metadata refresh in FrameObject so
  it can actually fire before data is dereferenced.
- Log the adaptive writer file-size decision at debug level and document
  the INT64 boxing and null precondition on the frame writer.
- Consolidate duplicated countParquet test logic into DeltaFrameTestUtils,
  add a loan-pattern helper for multi-file tables, assert on exception
  messages, and use assertEquals for per-cell comparisons.
- Remove the redundant DeltaFrameReadPerf micro-benchmark.
…elUtils

The prior commit ran the Eclipse formatter over the entire files, reflowing
many lines this PR does not touch. Restore both files to a minimal diff that
carries only the intended changes: the null-result check moved above the
metadata refresh in FrameObject, and the debug log for the adaptive writer
file-size decision in DeltaKernelUtils.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

1 participant